package org.west.sky.scripture.imports.core.websocket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

/**
 * @author: chz
 * @date: 2023/4/19
 * @description:
 */
@Slf4j
public class WebSocketManager {

    /**
     * 产品中心业务
     */
    public static final String BUSINESS_TASK_CENTER = "taskCenter";
    /**
     * 产品中心任务执行进度
     */
    public static final String TASK_CENTER_PROCESS = "process";
    /**
     * 产品中心任务通知
     */
    public static final String TASK_CENTER_NOTICE = "notice";


    /**
     * 连接集合
     */
    private static final ConcurrentHashMap<String, Map<String, WebSocketSession>> CONNECTION = new ConcurrentHashMap<>(16);

    /**
     * 连接接入的处理方法
     *
     * @param session
     */
    public static synchronized void initUsers(WebSocketSession session) {
        //业务大类
        String business = session.getAttributes().get("WEBSOCKET_BUSINESS").toString();
        //业务子类
        String subclass = session.getAttributes().get("WEBSOCKET_SUBCLASS").toString();
        //业务唯一标识
        String key = session.getAttributes().get("WEBSOCKET_KEY").toString();
        String sessionKey = putKey(business, subclass, key);
        Map<String, WebSocketSession> sessionMap = CONNECTION.get(sessionKey);
        if (sessionMap == null) {
            sessionMap = new ConcurrentHashMap<>(16);
        }
        sessionMap.putIfAbsent(session.getId(), session);
        CONNECTION.put(sessionKey, sessionMap);
        log.debug("sessionKey:{},sessionId:{}连接成功，当前用户共存在{}个连接", sessionKey, session.getId(), sessionMap.size());
    }

    /**
     * 连接被关闭时处理集合
     *
     * @param session
     */
    public static synchronized void removeSession(WebSocketSession session) {
        String business = session.getAttributes().get("WEBSOCKET_BUSINESS").toString();
        String subclass = session.getAttributes().get("WEBSOCKET_SUBCLASS").toString();
        String key = session.getAttributes().get("WEBSOCKET_KEY").toString();
        String sessionKey = putKey(business, subclass, key);
        if (CONNECTION.containsKey(sessionKey)) {
            Map<String, WebSocketSession> sessionMap = CONNECTION.get(sessionKey);
            //确保是同一个session 不是同一个session则不应该进行下一步的处理
            if (null != sessionMap) {
                sessionMap.remove(session.getId());
                log.debug("sessionKey:{},sessionId:{}关闭成功，当前用户共存在{}个连接", sessionKey, session.getId(), sessionMap.size());
            }
        } else {
            log.debug("检测到关闭session异常,程序中断处理,系统中未找到对应的session,key=" + sessionKey);
        }
    }

    /**
     * 数据发送
     *
     * @param sessionKey 业务-key
     * @param msg
     */
    public static void sendMsg(String sessionKey, String msg) {
        Executors.newSingleThreadExecutor().submit(() -> {
            if (CONNECTION.size() > 0 && CONNECTION.containsKey(sessionKey)) {
                //获取EID对应的后台管理连接 多个
                Map<String, WebSocketSession> sessionMap = CONNECTION.get(sessionKey);
                log.debug("开始发送数据:sessionKey=" + sessionKey + ",消息内容:msg=" + msg);
                sessionMap.forEach((k, v) -> {
                    try {
                        log.debug("向sessionId:{}发送数据", k);
                        v.sendMessage(new TextMessage(msg));
                    } catch (IOException e) {
                        log.debug("发生了异常：" + e.getMessage(), e);
                    }
                });

            }
        });
    }

    /**
     * 数据发送
     *
     * @param business
     * @param subclass
     * @param key
     * @param msg
     */
    public static void sendMsg(String business, String subclass, String key, String msg) {
        sendMsg(putKey(business, subclass, key), msg);
    }

    /**
     * 拼接sessionKey
     *
     * @param business
     * @param subclass
     * @param key
     * @return
     */
    public static String putKey(String business, String subclass, String key) {
        return business.concat("-").concat(subclass).concat("-").concat(key);
    }

    /**
     * ws连接已建立
     *
     * @param business
     * @param subclass
     * @param key
     * @return
     */
    public static boolean connected(String business, String subclass, String key) {
        return !CONNECTION.isEmpty() && CONNECTION.containsKey(putKey(business, subclass, key));
    }


}
